package persistent.prestige.console.threadmode.t3;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;

import persistent.prestige.utils.LogUtils;

/**
 * Nio 线程，专门负责nio read,write 本类是实例行代码，不会对nio,断线重连，写半包等场景进行处理,旨在理解
 * Reactor模型（多线程版本）
 * 
 * @author dingwei2
 * 
 */
public class SubReactorThread extends Thread {

	private Selector selector;
	private ExecutorService businessExecutorPool;

	private List<NioTask> taskList = new ArrayList<NioTask>(512);
	private ReentrantLock taskMainLock = new ReentrantLock();

	// 处理网络读写
	private IoHandler ioHandler = new IoHandler();

	/**
	 * 业务线程池
	 * 
	 * @param businessExecutorPool
	 */
	public SubReactorThread(ExecutorService businessExecutorPool) {
		try {
			this.businessExecutorPool = businessExecutorPool;
			this.selector = Selector.open();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/**
	 * socket channel
	 * 
	 * @param socketChannel
	 */
	public void register(NioTask task) {
		if (task != null) {
			try {
				taskMainLock.lock();
				taskList.add(task);
			} finally {
				taskMainLock.unlock();
			}
		}
	}

	// private

	public void run() {
		while (!Thread.interrupted()) {
			Set<SelectionKey> ops = null;
			try {
				selector.select(1000);
				ops = selector.selectedKeys();
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
				continue;
			}

			// 处理相关事件
			for (Iterator<SelectionKey> it = ops.iterator(); it.hasNext();) {
				SelectionKey key = it.next();
				it.remove();

				try {
					if (key.isWritable()) { // 向客户端发送请求
						
					} else if (key.isReadable()) { // 接受客户端请求
						
						LogUtils.log("服务端处理读事件");
						
						ioHandler.read(key);
					}
				} catch (Throwable e) {
					e.printStackTrace();
					System.out.println("客户端主动断开连接。。。。。。。");
				}

			}

			// 注册事件
			if (!taskList.isEmpty()) {
				try {
					taskMainLock.lock();
					for (Iterator<NioTask> it = taskList.iterator(); it.hasNext();) {
						NioTask task = it.next();
						try {
							SocketChannel sc = (SocketChannel)task.getSc().channel();
							if (task.getData() != null) {
								sc.register(selector, task.getOp(), task.getData());
							} else {
								sc.register(selector, task.getOp());
							}

						} catch (Throwable e) {
							e.printStackTrace();// ignore
						}
						it.remove();
					}

				} finally {
					taskMainLock.unlock();
				}
			}

		}
	}

	/**
	 * 此处的reqBuffer处于可写状态
	 * 
	 * @param sc
	 * @param reqBuffer
	 */
	private void dispatch(SocketChannel sc, ByteBuffer reqBuffer) {
		businessExecutorPool.submit(new Handler(sc, reqBuffer, this));
	}

	
}